package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.bean.TableProcess;
import com.atguigu.gmall.realtime.common.GmallConfig;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

public class TableProcessFunction extends BroadcastProcessFunction<JSONObject,String, JSONObject> {
    //创建连接对象
    private Connection conn;

    //分流
    private OutputTag<JSONObject> dimOutputTag;
    //状态描述器.
    MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    public TableProcessFunction(OutputTag<JSONObject> dimOutputTag, MapStateDescriptor<String, TableProcess> mapStatusDescriptor) {
        this.dimOutputTag = dimOutputTag;
        this.mapStateDescriptor = mapStatusDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        //建立连接
        conn= DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    //TODO 1 处理主流业务数据，主要是将数据分流，分离事实表和维度表，通过传入的配置状态.
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //从状态中获取配置信息
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        //获取业务表名字
        String tableName = value.getString("table");
        //获取操作类型
        Object type = value.get("type");
        //maxwell的bootstrap接收的历史数据都是bootstrap-insert类型
        if(type.equals("bootstrap-insert")){
            type="insert";
            value.put("type",type);
        }
        //拼接状态中的key---->tableName+operter
        String key =tableName+":"+type;
        TableProcess tableProcess = broadcastState.get(key);
        if(tableProcess !=null){
            //获取数据信息
            JSONObject dataJsonObj = value.getJSONObject("data");
            //获取输入目的的表名或者主题
            value.put("sink_table",tableProcess.getSinkTable());

            //根据配置表中的column对数据字段进行过滤
            String sinkColumns = tableProcess.getSinkColumns();
            if(sinkColumns !=null && sinkColumns.length()>0){
                //传入待表的配置信息data 和表列信息
                filterColumn(dataJsonObj,sinkColumns);
            }
            //分流事实维度
            //维度写测输出liu
            if(tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)){
                //维度
                ctx.output(dimOutputTag,value);
            }else if(tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)){
                //事实表
              out.collect(value);
            }

        }else {
            System.out.println("No this Key in TableProcess:" + key);
        }


    }

    //根据配置表中的sink_column列信息，对数据进行字段的过滤
    private void filterColumn(JSONObject dataJsonObj, String sinkColumns) {
        //也就是说，当配置表中的列没有的，原来的表有的列会被滤掉
        String[] fieldArr = sinkColumns.split(",");
        List<String> fieldList = Arrays.asList(fieldArr);
        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
        entrySet.removeIf(ele->!fieldList.contains(ele.getKey()));
    }

    //TODO 2 将广播流的配置表信息，存入状态.提供给主流使用来进行判断.(first先对配置数进行处理,处理后主流才可以使用配置表状态)
    //表的数据本来为json字符串.先将其转化为json对象.
    /*{
        "databases": "gmall1021_realtime",
            "data": {
        "operate_type": "insert",
                "sink_type": "kafka",
                "sink_table": "dwd_order",
                "source_table": "order_info"
    },
        "type": "create",
            "table": "table_process"
    }*/
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //将字符串转化为对象
        JSONObject jsonObject = JSON.parseObject(value);
        //我们现在需要获取的就是，配置信息就是data
        String dataJsonStr = jsonObject.getString("data");
        //将dataJsonStr转化为TableProcess实体类对象,这样就可以获取配置表实体类对象的属性.
        TableProcess tableProcess = JSONObject.parseObject(dataJsonStr, TableProcess.class);
        //获取配置表中数据
        if(tableProcess != null){
            //获取表
            String sourceTable = tableProcess.getSourceTable();
            //获取操作c r u d类型
            String operateType = tableProcess.getOperateType();
            //获取标记信息，区分事实表和维度表hbase kafka
            String sinkType = tableProcess.getSinkType();
            //获取输出的表或者kafka主题
            String sinkTable = tableProcess.getSinkTable();
            //若是写入hbase，获取列信息
            String sinkColumns = tableProcess.getSinkColumns();
            //hbase parmarkey
            String sinkPk = tableProcess.getSinkPk();
            //hbase扩展信息
            String sinkExtend = tableProcess.getSinkExtend();
            //拼接一个key 作为配置状态信息的key,用表名和操作类型拼接 table+operate
            String key= sourceTable+ ":" +operateType;

            //将配置信息放进状态
            ctx.getBroadcastState(mapStateDescriptor).put(key,tableProcess);

            //判断是不是维度数据若不是维度数据可以提前在hbase中建表，否则后续在主流中插入数据，hbase没有对应的表
            //将报错
            if(TableProcess.SINK_TYPE_HBASE.equals(sinkType) && "insert".equals(operateType)){
                //表名，列，主键，扩展.
                checkTable(sinkTable,sinkColumns,sinkPk,sinkExtend);
                System.out.println(key);
            }
            System.out.println(key+"*********");



        }




    }

    private void checkTable(String tableName, String tableColumns, String pk, String extend) {
        if(pk == null){
            //一般指定将id指定为主键
            pk="id";
        }
        if(extend == null){
            extend="";
        }
        //拼接建表语句，不要string拼接，是因为每拼接一次，创建一个新的string对象，资源浪费
        StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA +
                "." + tableName + "(");

        //对sinkColumn进行切分   得到一个个字段  id,tm_name
        String[] fieldsArr = tableColumns.split(",");
        for (int i = 0; i < fieldsArr.length; i++) {
            String field = fieldsArr[i];
            if (field.equals(pk)) {
                createSql.append(field + " varchar primary key");
            } else {
                createSql.append(field + " varchar ");
            }
            if (i < fieldsArr.length - 1) {
                createSql.append(",");
            }
        }
        createSql.append(")" + extend);
        System.out.println("建表语句为：" + createSql);

        //操作对象
        PreparedStatement ps =null;

        try {
            ps = conn.prepareStatement(createSql.toString());
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
            throw  new RuntimeException("在phoenix中建表失败");
        }finally {
            //释放资源，连接不是释放，操作对象释放
            if(ps != null){
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}
